package services

import (
	"fmt"
	"math"
	"strings"
	"sync"
	"time"

	datastructure "github.com/duke-git/lancet/v2/datastructure/set"
	"github.com/duke-git/lancet/v2/maputil"
	"github.com/leandro-lugaresi/hub"
	"github.com/muety/wakapi/config"
	"github.com/muety/wakapi/repositories"
	"github.com/muety/wakapi/utils"
	"github.com/patrickmn/go-cache"

	"github.com/muety/wakapi/models"
)

type HeartbeatService struct {
	config              *config.Config
	cache               *cache.Cache
	eventBus            *hub.Hub
	repository          repositories.IHeartbeatRepository
	languageMappingSrvc ILanguageMappingService
	entityCacheLock     *sync.RWMutex
}

func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, languageMappingService ILanguageMappingService) *HeartbeatService {
	srv := &HeartbeatService{
		config:              config.Get(),
		cache:               cache.New(24*time.Hour, 24*time.Hour),
		eventBus:            config.EventBus(),
		repository:          heartbeatRepo,
		languageMappingSrvc: languageMappingService,
		entityCacheLock:     &sync.RWMutex{},
	}

	// using event hub is an unnecessary indirection here, however, we might
	// potentially need heartbeat events elsewhere throughout the application some time
	// so it's more consistent to already have it this way
	sub1 := srv.eventBus.Subscribe(0, config.EventHeartbeatCreate)
	go func(sub *hub.Subscription) {
		for m := range sub.Receiver {
			heartbeat := m.Fields[config.FieldPayload].(*models.Heartbeat)
			srv.cache.IncrementInt64(srv.countByUserCacheKey(heartbeat.UserID), 1) // increment doesn't update expiration time
			srv.cache.IncrementInt64(srv.countTotalCacheKey(), 1)
			srv.checkInvalidateProjectStatsCache(heartbeat)
			srv.checkInvalidateRangeCache(heartbeat)
		}
	}(&sub1)

	return srv
}

func (srv *HeartbeatService) Insert(heartbeat *models.Heartbeat) error {
	go srv.updateEntityUserCacheByHeartbeat(heartbeat)
	return srv.repository.InsertBatch([]*models.Heartbeat{heartbeat})
}

func (srv *HeartbeatService) InsertBatch(heartbeats []*models.Heartbeat) error {
	if len(heartbeats) == 0 {
		return nil
	}

	hashes := datastructure.New[string]()

	// https://github.com/muety/wakapi/issues/139
	filteredHeartbeats := make([]*models.Heartbeat, 0, len(heartbeats))
	for _, hb := range heartbeats {
		if !hashes.Contain(hb.Hash) {
			hb = hb.Sanitize()
			filteredHeartbeats = append(filteredHeartbeats, hb)
			hashes.Add(hb.Hash)
		}
		go srv.updateEntityUserCacheByHeartbeat(hb)
	}

	err := srv.repository.InsertBatch(filteredHeartbeats)
	if err == nil {
		go srv.notifyBatch(filteredHeartbeats)
	}
	return err
}

func (srv *HeartbeatService) Count(approximate bool) (int64, error) {
	result, ok := srv.cache.Get(srv.countTotalCacheKey())
	if ok {
		return result.(int64), nil
	}
	count, err := srv.repository.Count(approximate)
	if err == nil {
		srv.cache.Set(srv.countTotalCacheKey(), count, srv.countCacheTtl())
	}
	return count, err
}

func (srv *HeartbeatService) CountByUser(user *models.User) (int64, error) {
	key := srv.countByUserCacheKey(user.ID)
	result, ok := srv.cache.Get(key)
	if ok {
		return result.(int64), nil
	}
	count, err := srv.repository.CountByUser(user)
	if err == nil {
		srv.cache.Set(key, count, srv.countCacheTtl())
	}
	return count, err
}

func (srv *HeartbeatService) CountByUsers(users []*models.User) ([]*models.CountByUser, error) {
	missingUsers := make([]*models.User, 0, len(users))
	userCounts := make([]*models.CountByUser, 0, len(users))

	for _, u := range users {
		key := srv.countByUserCacheKey(u.ID)
		result, ok := srv.cache.Get(key)
		if ok {
			userCounts = append(userCounts, &models.CountByUser{User: u.ID, Count: result.(int64)})
		} else {
			missingUsers = append(missingUsers, u)
		}
	}

	counts, err := srv.repository.CountByUsers(missingUsers)
	if err != nil {
		return nil, err
	}

	for _, uc := range counts {
		key := srv.countByUserCacheKey(uc.User)
		srv.cache.Set(key, uc.Count, srv.countCacheTtl())
		userCounts = append(userCounts, uc)
	}

	return userCounts, nil
}

func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Heartbeat, error) {
	heartbeats, err := srv.repository.GetWithin(from, to, user)
	if err != nil {
		return nil, err
	}
	return srv.augmented(heartbeats, user.ID)
}

func (srv *HeartbeatService) StreamAllWithin(from, to time.Time, user *models.User) (chan *models.Heartbeat, error) {
	languageMapping, err := srv.languageMappingSrvc.ResolveByUser(user.ID)
	if err != nil {
		return nil, err
	}

	c, err := srv.repository.StreamWithin(from, to, user)
	if err != nil {
		return nil, err
	}
	return srv.augmentedAsync(c, languageMapping)
}

func (srv *HeartbeatService) GetAllWithinByFilters(from, to time.Time, user *models.User, filters *models.Filters) ([]*models.Heartbeat, error) {
	heartbeats, err := srv.repository.GetAllWithinByFilters(from, to, user, srv.filtersToColumnMap(filters))
	if err != nil {
		return nil, err
	}
	return srv.augmented(heartbeats, user.ID)
}

func (srv *HeartbeatService) StreamAllWithinByFilters(from, to time.Time, user *models.User, filters *models.Filters) (chan *models.Heartbeat, error) {
	languageMapping, err := srv.languageMappingSrvc.ResolveByUser(user.ID)
	if err != nil {
		return nil, err
	}

	c, err := srv.repository.StreamWithinByFilters(from, to, user, srv.filtersToColumnMap(filters))
	if err != nil {
		return nil, err
	}
	return srv.augmentedAsync(c, languageMapping)
}

func (srv *HeartbeatService) GetLatestByUser(user *models.User) (*models.Heartbeat, error) {
	return srv.repository.GetLatestByUser(user)
}

func (srv *HeartbeatService) GetLatestByOriginAndUser(origin string, user *models.User) (*models.Heartbeat, error) {
	return srv.repository.GetLatestByOriginAndUser(origin, user)
}

func (srv *HeartbeatService) GetLatestByFilters(user *models.User, filters *models.Filters) (*models.Heartbeat, error) {
	return srv.repository.GetLatestByFilters(user, srv.filtersToColumnMap(filters))
}

func (srv *HeartbeatService) GetFirstAll() ([]*models.TimeByUser, error) {
	return srv.repository.GetFirstAll()
}

func (srv *HeartbeatService) GetLastAll() ([]*models.TimeByUser, error) {
	return srv.repository.GetLastAll()
}

func (srv *HeartbeatService) GetFirstByUser(user *models.User) (time.Time, error) {
	cacheKey := srv.getUserFirstCacheKey(user.ID)
	if result, found := srv.cache.Get(cacheKey); found {
		return result.(time.Time), nil
	}

	result, err := srv.repository.GetRangeByUser(user)
	if err != nil {
		return time.Time{}, err
	}

	srv.cache.Set(cacheKey, result.First.T(), cache.NoExpiration)
	return result.First.T(), nil
}

func (srv *HeartbeatService) GetLastByUser(user *models.User) (time.Time, error) {
	cacheKey := srv.getUserLastCacheKey(user.ID)
	if result, found := srv.cache.Get(cacheKey); found {
		return result.(time.Time), nil
	}

	result, err := srv.repository.GetRangeByUser(user)
	if err != nil {
		return time.Time{}, err
	}

	srv.cache.Set(cacheKey, result.Last.T(), cache.NoExpiration)
	return result.Last.T(), nil
}

func (srv *HeartbeatService) GetRangeByUser(user *models.User) (*models.RangeByUser, error) {
	return srv.repository.GetRangeByUser(user)
}

func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, userId string) ([]string, error) {
	cacheKey := srv.getEntityUserCacheKey(entityType, userId)
	if results, found := srv.cache.Get(cacheKey); found {
		srv.entityCacheLock.RLock()
		defer srv.entityCacheLock.RUnlock()
		return results.(datastructure.Set[string]).Values(), nil
	}

	results, err := srv.repository.GetEntitySetByUser(entityType, userId)
	if err != nil {
		return nil, err
	}

	filtered := make([]string, 0, len(results))
	for _, r := range results {
		if strings.TrimSpace(r) != "" {
			filtered = append(filtered, r)
		}
	}

	srv.cache.Set(cacheKey, datastructure.New(filtered...), cache.NoExpiration)
	return filtered, nil
}

func (srv *HeartbeatService) DeleteBefore(t time.Time) error {
	go srv.cache.Flush()
	return srv.repository.DeleteBefore(t)
}

func (srv *HeartbeatService) DeleteByUser(user *models.User) error {
	go srv.cache.Flush()
	return srv.repository.DeleteByUser(user)
}

func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error {
	go srv.cache.Flush()
	return srv.repository.DeleteByUserBefore(user, t)
}

func (srv *HeartbeatService) GetUserProjectStats(user *models.User, from, to time.Time, pageParams *utils.PageParams, skipCache bool) ([]*models.ProjectStats, error) {
	// for projects page, call this like: GetUserProjectStats(&models.User{ID: "n1try"}, time.Time{}, utils.BeginOfToday(time.Local), false)

	var (
		limit  = math.MaxInt32
		offset = 0
	)

	if pageParams != nil {
		limit = pageParams.Limit()
		offset = pageParams.Offset()
	}

	cacheKey := fmt.Sprintf("project_stats_%s_%d_%d_%d_%d", user.ID, from.Unix(), to.Unix(), limit, offset)
	if results, found := srv.cache.Get(cacheKey); found && !skipCache {
		return results.([]*models.ProjectStats), nil
	} else if results, found := srv.cache.Get(fmt.Sprintf("project_stats_%s_%d_%d_%d_%d", user.ID, from.Unix(), to.Unix(), math.MaxInt32, 0)); found && !skipCache {
		return utils.SubSlice[*models.ProjectStats](results.([]*models.ProjectStats), uint(offset), uint(offset+limit)), nil
	}

	if to.IsZero() {
		to = time.Now()
	}

	results, err := srv.repository.GetUserProjectStats(user, from, to, limit, offset)
	if err == nil {
		srv.cache.Set(cacheKey, results, 12*time.Hour)
	}

	go srv.populateUniqueUserProjects(user.ID)

	return results, err
}

// GetUserAgentsByUser returns a list of all user agents that have been recorded for the given user.
func (srv *HeartbeatService) GetUserAgentsByUser(user *models.User) ([]*models.UserAgent, error) {
	userAgents, err := srv.repository.GetUserAgentsByUser(user)
	if err != nil {
		return nil, err
	}
	for _, ua := range userAgents {
		ua.WithId()
	}
	return userAgents, nil
}

func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) {
	languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId)
	if err != nil {
		return nil, err
	}
	for i := range heartbeats {
		heartbeats[i].Augment(languageMapping)
	}
	return heartbeats, nil
}

func (srv *HeartbeatService) augmentedAsync(in chan *models.Heartbeat, languageMapping map[string]string) (chan *models.Heartbeat, error) {
	// if this method made the query to fetch langauge mapping itself, it would produce a dead loop in case there are less than 2 database connections
	out := make(chan *models.Heartbeat)
	go func(in, out chan *models.Heartbeat) {
		defer close(out)
		for hb := range in {
			hb.Augment(languageMapping)
			out <- hb
		}
	}(in, out)
	return out, nil
}

func (srv *HeartbeatService) getEntityUserCacheKey(entityType uint8, userId string) string {
	return fmt.Sprintf("entity_set_%d_%s", entityType, userId)
}

func (srv *HeartbeatService) getUserProjectsCacheKey(userId string) string {
	return fmt.Sprintf("unique_projects_%s", userId)
}

func (srv *HeartbeatService) getUserFirstCacheKey(userId string) string {
	return fmt.Sprintf("user_first_%s", userId)
}

func (srv *HeartbeatService) getUserLastCacheKey(userId string) string {
	return fmt.Sprintf("user_last_%s", userId)
}

func (srv *HeartbeatService) updateEntityUserCache(entityType uint8, entityKey string, userId string) {
	cacheKey := srv.getEntityUserCacheKey(entityType, userId)
	if entities, found := srv.cache.Get(cacheKey); found {
		entitySet := entities.(datastructure.Set[string])

		srv.entityCacheLock.Lock()
		defer srv.entityCacheLock.Unlock()

		if !entitySet.Contain(entityKey) {
			entitySet.Add(entityKey)
			// new project / language / ..., which is not yet present in cache, arrived as part of a heartbeats
			// -> update cache instead of just invalidating it, because rebuilding is expensive here
			srv.cache.Set(cacheKey, entitySet, cache.NoExpiration)
		}
	}
}

func (srv *HeartbeatService) updateEntityUserCacheByHeartbeat(hb *models.Heartbeat) {
	go srv.updateEntityUserCache(models.SummaryProject, hb.Project, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryLanguage, hb.Language, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryEditor, hb.Editor, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryBranch, hb.Branch, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryEntity, hb.Entity, hb.UserID)
	go srv.updateEntityUserCache(models.SummaryCategory, hb.Category, hb.UserID)
}

func (srv *HeartbeatService) notifyBatch(heartbeats []*models.Heartbeat) {
	for _, hb := range heartbeats {
		srv.eventBus.Publish(hub.Message{
			Name:   config.EventHeartbeatCreate,
			Fields: map[string]interface{}{config.FieldPayload: hb},
		})
	}
}

func (srv *HeartbeatService) countByUserCacheKey(userId string) string {
	return fmt.Sprintf("%s--hearbeat-count", userId)
}

func (srv *HeartbeatService) countTotalCacheKey() string {
	return "heartbeat-count"
}

func (srv *HeartbeatService) countCacheTtl() time.Duration {
	return time.Duration(srv.config.App.CountCacheTTLMin) * time.Minute
}

func (srv *HeartbeatService) filtersToColumnMap(filters *models.Filters) map[string][]string {
	columnMap := map[string][]string{}
	for _, t := range models.NativeSummaryTypes() {
		f := filters.ResolveType(t)
		if len(*f) > 0 {
			columnMap[models.GetEntityColumn(t)] = *f
		}
	}
	return columnMap
}

func (srv *HeartbeatService) populateUniqueUserProjects(userId string) {
	userProjectsCacheKey := srv.getUserProjectsCacheKey(userId)
	if _, found := srv.cache.Get(userProjectsCacheKey); !found {
		projects, _ := srv.GetEntitySetByUser(models.SummaryProject, userId)
		srv.cache.Set(userProjectsCacheKey, datastructure.New[string](projects...), cache.NoExpiration)
	}
}

func (srv *HeartbeatService) checkInvalidateProjectStatsCache(newHeartbeat *models.Heartbeat) {
	// checks the cache of unique projects and clears the user's project_stats_* cache items if the new heartbeat is for a new, unseen project
	var invalidated bool
	if uniqueProjects, found := srv.cache.Get(srv.getUserProjectsCacheKey(newHeartbeat.UserID)); found && !uniqueProjects.(datastructure.Set[string]).Contain(newHeartbeat.Project) {
		for _, k := range maputil.Keys[string, cache.Item](srv.cache.Items()) {
			if strings.HasPrefix(k, fmt.Sprintf("project_stats_%s_", newHeartbeat.UserID)) {
				srv.cache.Delete(k)
				invalidated = true
			}
		}
	}
	if invalidated {
		go srv.populateUniqueUserProjects(newHeartbeat.UserID)
	}
}

func (srv *HeartbeatService) checkInvalidateRangeCache(newHeartbeat *models.Heartbeat) {
	keyFirst, keyLast := srv.getUserFirstCacheKey(newHeartbeat.UserID), srv.getUserLastCacheKey(newHeartbeat.UserID)

	first, found := srv.cache.Get(keyFirst)
	if found && newHeartbeat.Time.T().Before(first.(time.Time)) {
		srv.cache.Delete(keyFirst)
	}

	last, found := srv.cache.Get(keyLast)
	if found && newHeartbeat.Time.T().After(last.(time.Time)) {
		srv.cache.Delete(keyLast)
	}
}
